package com.softwin.cryptocurrency.demo.twoarbitrage;
import lombok.*;

import com.softwin.cryptocurrency.demo.security.JwtCallCredential;
import com.softwin.grpc.common.Common;
import com.softwin.marketdata.gateway.service.MarketDataManageGrpc;
import com.softwin.marketdata.gateway.service.MarketGateway;
import com.softwin.order.reporting.service.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * Created by Huai weicheng on 2018/08/01.
 */


public class TwoArbitrage {
    @Data
    @AllArgsConstructor
    private class BidAsk1{

        private String exchange;

        private BigDecimal bid1Price;
        private BigDecimal bid1Quantity;

        private BigDecimal ask1Price;
        private BigDecimal ask1Quantity;

        private long timestamp;

        public boolean equals(Object obj) {
            if (obj instanceof BidAsk1) {
                BidAsk1 bidAsk1 = (BidAsk1) obj;
                return (exchange.equals(bidAsk1.exchange) && bid1Price.compareTo(bidAsk1.bid1Price) == 0
                        && bid1Quantity.compareTo(bidAsk1.bid1Quantity) == 0 &&  ask1Price.compareTo(bidAsk1.ask1Price) == 0
                        && ask1Quantity.compareTo(bidAsk1.ask1Quantity) == 0);
            }
            return super.equals(obj);
        }

        public int hashCode() {
            return exchange.hashCode() + bid1Price.toString().hashCode() + ask1Price.toString().hashCode() +
                    bid1Quantity.toString().hashCode() + ask1Quantity.toString().hashCode();
        }

    }

    @Data
    private class ComboSymbol {
        private String baseExchange;
        private String quoteExchange;
        private BidAsk1 baseBidAsk1;
        private BidAsk1 quoteBidAsk1;

        public ComboSymbol(String baseExchange, String quoteExchange) {
            this.baseExchange = baseExchange;
            this.quoteExchange = quoteExchange;
        }

        public BigDecimal getBuySpread() {
            if (baseBidAsk1 != null && quoteBidAsk1 != null)
                return baseBidAsk1.getBid1Price().subtract(quoteBidAsk1.getAsk1Price());
            return null;
        }

        public BigDecimal getSellSpread() {
            if (baseBidAsk1 != null && quoteBidAsk1 != null)
                return baseBidAsk1.getAsk1Price().subtract(quoteBidAsk1.getBid1Price());
            return null;
        }

        public BigDecimal getBuyQuantity() {
            if (baseBidAsk1 != null && quoteBidAsk1 != null)
                return baseBidAsk1.getBid1Quantity().min(quoteBidAsk1.getAsk1Quantity());
            return null;
        }

        public BigDecimal getSellQuantity() {
            if (baseBidAsk1 != null && quoteBidAsk1 != null)
                return baseBidAsk1.getAsk1Quantity().min(quoteBidAsk1.getBid1Quantity());
            return null;
        }

        public void update(String exchange, BidAsk1 bidAsk1) {
            if (baseExchange.equals(exchange))
                baseBidAsk1 = bidAsk1;
            else if (quoteExchange.equals(exchange))
                quoteBidAsk1 = bidAsk1;
        }

        @Override
        public String toString() {
            if (baseBidAsk1 != null && quoteBidAsk1 != null)
                return baseExchange + " - " + quoteExchange
                        + " 买价 = " + getBuySpread().setScale(5, RoundingMode.HALF_EVEN).toString()
                        + " 卖价 = " + getSellSpread().setScale(5, RoundingMode.HALF_EVEN).toString()
                        + " 买量 = " + getBuyQuantity().setScale(5, RoundingMode.HALF_EVEN).toString()
                        + " 卖量 = " + getSellQuantity().setScale(5, RoundingMode.HALF_EVEN).toString();
            else
                return baseExchange + " - " + quoteExchange + "组合行情未形成";
        }
    }

    @Data
    @AllArgsConstructor
    private class ComboOrder {
        ComboSymbol symbol;
        String orderRef1;
        String orderRef2;
        String status1;
        String status2;
        String direction;

    }

    private String host = "192.168.1.162"; //服务器地址
    private int mgport = 8991;
    private int tgport = 8993;
    private String username = "testTrader"; //交易员用户名
    private String password = "66666"; //交易员密码
    private String baseCurrency = "btc";
    private String quoteCurrency = "usdt";

    private String quantity = "0.0001"; //下单量
    private BigDecimal gap = new BigDecimal("5"); // ask1 与 bid1的差距达到一个gap后才触发交易

    private List<String> exchanges = Arrays.asList("huobi", "binance", "okex"); // 交易所
    private Map<String, Common.Symbol> symbolMap = new HashMap<>(); //交易币对
    private Map<String, String> subaccountMap = new HashMap<>(); //子账号
    private Map<String, Map<String, BigDecimal>> lastCurrencyMap = new HashMap<>(); //最新仓位
    private Map<String, BidAsk1> bidAsk1Map = new HashMap<>(); //最新bid1ask1
    private Map<String, ComboSymbol> comboSymbolMap = new HashMap<>(); //最新arbitrage symbol

    public List<List<String>> records = new ArrayList<>();

    private OrderReportingGrpc.OrderReportingStub tgStub; //交易网关
    private MarketDataManageGrpc.MarketDataManageStub mgStub; //行情网关
    private QueryServiceGrpc.QueryServiceStub queryStub; //查询网关
    private QueryServiceGrpc.QueryServiceBlockingStub queryBlockingStub ; //查询网关（阻塞）

    private List<ComboOrder> comboOrders= new ArrayList<>(); //等待成交的报单，key为方向(buy/sell)，value为orderId

    public static void main(String[] args)  throws InterruptedException {
        TwoArbitrage ta = new TwoArbitrage();
        Thread.sleep(1000 * 60 );
        Array2CSV(ta.records, "data.csv");
    }

    /**
     * 初始化均值计算
     * 初始化三个grpc服务
     * 调用登录接口，获取token
     * 初始化交易标的
     */
    private TwoArbitrage(){
        ManagedChannel mgChannel = ManagedChannelBuilder.forAddress(host, mgport).usePlaintext().build();
        //初始化行情网关
        mgStub = MarketDataManageGrpc.newStub(mgChannel);
        ManagedChannel tgChannel = ManagedChannelBuilder.forAddress(host, tgport).usePlaintext().build();
        //登录获取token
        LoginGrpc.LoginBlockingStub loginBlockingStub = LoginGrpc.newBlockingStub(tgChannel);
        LoginOuterClass.RspLogin rspLogin = loginBlockingStub.login(LoginOuterClass.ReqLogin.newBuilder().setUsername(username).setPassword(password).build());
        String token = rspLogin.getToken();
        //初始化交易网关和查询网关
        tgStub = OrderReportingGrpc.newStub(tgChannel).withCallCredentials(new JwtCallCredential(token));
        queryStub = QueryServiceGrpc.newStub(tgChannel).withCallCredentials(new JwtCallCredential(token));
        queryBlockingStub = QueryServiceGrpc.newBlockingStub(tgChannel).withCallCredentials(new JwtCallCredential(token));

        for (String exchange : exchanges) {
            Common.Symbol symbol = Common.Symbol.newBuilder().setBaseCurrency(baseCurrency).setQuoteCurrency(quoteCurrency).setExchangeType(exchange).build();
            symbolMap.put(exchange, symbol);
        }

        //查询初始仓位
        Orderreporting.SubaccountResponse subaccountResponse = queryBlockingStub.getSubaccounts(Orderreporting.SubaccountRequest.newBuilder().build());
        for (Orderreporting.Subaccount subaccount : subaccountResponse.getDataList())
        {
           String exchange = subaccount.getExchangeType();
           if (exchanges.contains(exchange)) {
               Map<String, BigDecimal> exchangeBalanceMap = new HashMap<>();
               for (Orderreporting.AssetBalance assetBalance : subaccount.getAssetBalanceList()) {
                   if (assetBalance.getAsset().equals(baseCurrency) || assetBalance.getAsset().equals(quoteCurrency)) {
                       exchangeBalanceMap.put(assetBalance.getAsset(), new BigDecimal(assetBalance.getFree()));
                   }
               }
               if (!exchangeBalanceMap.containsKey(baseCurrency)) {
                   exchangeBalanceMap.put(baseCurrency, BigDecimal.ZERO);
               }
               if (!exchangeBalanceMap.containsKey(quoteCurrency)) {
                   exchangeBalanceMap.put(quoteCurrency, BigDecimal.ZERO);
               }
               lastCurrencyMap.put(exchange, exchangeBalanceMap);
               subaccountMap.put(exchange, subaccount.getSubaccountCode());
           }
        }

        for (String exchange : exchanges) {
            bidAsk1Map.put(exchange, null);
        }

        for (int i = 0; i< exchanges.size(); i++) {
            for (int j = i + 1; j< exchanges.size(); j++) {
                String exchangePair = exchanges.get(i) + "/" + exchanges.get(j);
                comboSymbolMap.put(exchangePair, new ComboSymbol(exchanges.get(i), exchanges.get(j)));
            }
        }

        initRecord();
        subTrade();
        subMarketData();
    }
    /**
     * 订阅成交回报，在回报时维护orderMap,将已成交订单移除
     */
    private void subTrade(){
        //订阅成交回报
        StreamObserver<Orderreporting.ReqSubTrade> reqSubTradeStreamObserver = tgStub.subTrade(new StreamObserver<Orderreporting.RtnSubTrade>() {
            @Override
            public void onNext(Orderreporting.RtnSubTrade rtnSubTrade) {
                String exchange = rtnSubTrade.getTrade().getSymbol().getExchangeType();
                String price = rtnSubTrade.getTrade().getPrice();
                String commission = rtnSubTrade.getTrade().getCommission();
                //更新btc余额，这里只考虑全部成交
                if (new BigDecimal(rtnSubTrade.getTrade().getQuantity()).compareTo(new BigDecimal(quantity)) == 0) {
                    if (rtnSubTrade.getTrade().getSide().equals("buy")) {
                        System.out.println("买单已成交");
                        Map<String, BigDecimal> balanceMap = lastCurrencyMap.get(exchange);
                        BigDecimal baseBalance = balanceMap.get(baseCurrency);
                        BigDecimal quoteBalance = balanceMap.get(quoteCurrency);
                        balanceMap.put(baseCurrency, baseBalance.add(new BigDecimal(quantity).subtract(new BigDecimal(commission))));
                        balanceMap.put(quoteCurrency, quoteBalance.subtract(new BigDecimal(quantity).multiply(new BigDecimal(price))));
//                        orderMap.remove("buy");
                    } else if (rtnSubTrade.getTrade().getSide().equals("sell")) {
                        System.out.println("卖单已成交");
                        Map<String, BigDecimal> balanceMap = lastCurrencyMap.get(exchange);
                        BigDecimal baseBalance = balanceMap.get(baseCurrency);
                        BigDecimal quoteBalance = balanceMap.get(quoteCurrency);
                        balanceMap.put(baseCurrency, baseBalance.subtract(new BigDecimal(quantity)));
                        balanceMap.put(quoteCurrency, quoteBalance.add(new BigDecimal(quantity).multiply(new BigDecimal(price)).subtract(new BigDecimal(commission))));
//                        orderMap.remove("sell");
                    }
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
        //不要漏掉下面这句调用，向服务请求订阅成交回报
        reqSubTradeStreamObserver.onNext(Orderreporting.ReqSubTrade.newBuilder().build());
        System.out.println("订阅了所有交易所的成交回报" );

    }

    /**
     * 订阅行情，比较不同交易所的ask1与bid1，
     */
    private void subMarketData(){
        StreamObserver<MarketGateway.ReqSubDepth> requestObserver = mgStub.subDepth(new StreamObserver<MarketGateway.DepthsDataResponse>() {
            @Override
            public void onNext(MarketGateway.DepthsDataResponse depthsDataResponse) {
                if (!depthsDataResponse.getCRsp().getIsSuccess()){
                    return;
                }
                MarketGateway.DepthsData depthsData = depthsDataResponse.getData();
                String exchange = depthsData.getSymbol().getExchangeType();
                BidAsk1 bidAsk1 = new BidAsk1(depthsData.getSymbol().getExchangeType(), new BigDecimal(depthsData.getBids(0).getPrice()),
                        new BigDecimal(depthsData.getBids(0).getQuantity()), new BigDecimal(depthsData.getAsks(0).getPrice()),
                        new BigDecimal(depthsData.getAsks(0).getQuantity()), depthsData.getTimestamp());
                System.out.println(bidAsk1);
                bidAsk1Map.put(exchange, bidAsk1);
                for (Map.Entry<String, ComboSymbol> entry : comboSymbolMap.entrySet()) {
                    if (entry.getKey().contains(exchange)) {
                        if(!bidAsk1.equals(entry.getValue().getBaseBidAsk1()) || !bidAsk1.equals(entry.getValue().getQuoteBidAsk1())) {
                            entry.getValue().update(exchange, bidAsk1);
                            record();
                        }
                    }
                }

//                handleMd();
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
        for (Map.Entry<String, Common.Symbol> entry : symbolMap.entrySet()) {
            System.out.println("订阅了" + entry.getValue().getExchangeType() + "交易所的行情" );
            MarketGateway.ReqSubDepth reqSubDepth = MarketGateway.ReqSubDepth.newBuilder().setSymbol(entry.getValue()).build();
            requestObserver.onNext(reqSubDepth);
        }
    }

    /**
     * 初始化行情记录的数据结构
     */
    private void initRecord() {
        List<String> l = new ArrayList<>();
        for (Map.Entry<String, ComboSymbol> entry : comboSymbolMap.entrySet()) {
            ComboSymbol comboSymbol = entry.getValue();
            l.add(comboSymbol.getBaseExchange() + " / " + comboSymbol.getQuoteExchange());
            l.add(comboSymbol.getQuoteExchange() + " / " + comboSymbol.getBaseExchange());
            l.add(comboSymbol.getBaseExchange() + " timestamp");
            l.add(comboSymbol.getQuoteExchange() + " timestamp");
        }
        records.add(l);
    }

    /**
     * 初始化行情记录的数据结构
     */
    private void record() {
        SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.SSS");
        List<String> l = new ArrayList<>();
        for (Map.Entry<String, ComboSymbol> entry : comboSymbolMap.entrySet()) {
            ComboSymbol comboSymbol = entry.getValue();
            if (comboSymbol.getBuySpread() != null) {
                l.add(comboSymbol.getBuySpread().setScale(5, RoundingMode.HALF_EVEN).toString());
                l.add(comboSymbol.getSellSpread().setScale(5, RoundingMode.HALF_EVEN).toString());
                l.add(sdf.format(new Date(comboSymbol.getBaseBidAsk1().getTimestamp())));
                l.add(sdf.format(new Date(comboSymbol.getQuoteBidAsk1().getTimestamp())));
            }
        }
        records.add(l);
    }


    /**
     * 处理最新的bid1 ask1的数据，决定是否要进行交易
     */
    private void handleMd() {
        for (Map.Entry<String, ComboSymbol> entry : comboSymbolMap.entrySet()) {
            ComboSymbol comboSymbol = entry.getValue();
            if (comboSymbol != null)
            {
                if (comboSymbol.getBuySpread() != null)
                {
                    if (comboSymbol.getBuySpread().compareTo(new BigDecimal("4")) >= 0 )
                    {
                        System.out.println(comboSymbol);
                         insertPairOrders(comboSymbol, "sell");
                    }
                    else if (comboSymbol.getSellSpread().compareTo(new BigDecimal("-4")) <= 0) {
                        System.out.println(comboSymbol);
                         insertPairOrders(comboSymbol, "buy");
                    }
                }

            }
        }
    }

    /**
     * 调用接口进行限价买入，收到响应后将orderId放入orderMap中
     */
    private void insertPairOrders(ComboSymbol comboSymbol, String direction){
        Orderreporting.InsertOrderRequest insertOrderRequest1, insertOrderRequest2;
        String baseExchange = comboSymbol.getBaseExchange();
        String quoteExchange = comboSymbol.getQuoteExchange();
        String baseSubaccountCode = subaccountMap.get(baseExchange);
        String quoteSubaccountCode = subaccountMap.get(quoteExchange);
        Common.Symbol baseSymbol = symbolMap.get(baseExchange);
        Common.Symbol quoteSymbol = symbolMap.get(quoteExchange);
        if (direction.equals("sell")) {
            // 以第一腿的bid1买一价去卖
            insertOrderRequest1 = Orderreporting.InsertOrderRequest.newBuilder().setSubaccountCode(baseSubaccountCode)
                    .setSymbol(baseSymbol).setQuantity(quantity).setPrice(comboSymbol.getBaseBidAsk1().getBid1Price().toString()).setType("limit").setSide("sell").build();
            // 以第二腿的ask1卖一价去买
            insertOrderRequest2 = Orderreporting.InsertOrderRequest.newBuilder().setSubaccountCode(quoteSubaccountCode)
                    .setSymbol(quoteSymbol).setQuantity(quantity).setPrice(comboSymbol.getQuoteBidAsk1().getAsk1Price().toString()).setType("limit").setSide("buy").build();
        }
        else if (direction.equals("buy")) {
            // 以第一腿的ask1卖一价去买
            insertOrderRequest1 = Orderreporting.InsertOrderRequest.newBuilder().setSubaccountCode(baseSubaccountCode)
                    .setSymbol(baseSymbol).setQuantity(quantity).setPrice(comboSymbol.getBaseBidAsk1().getAsk1Price().toString()).setType("limit").setSide("buy").build();
            // 以第二腿的bid1买一价去卖
            insertOrderRequest2 = Orderreporting.InsertOrderRequest.newBuilder().setSubaccountCode(quoteSubaccountCode)
                    .setSymbol(quoteSymbol).setQuantity(quantity).setPrice(comboSymbol.getQuoteBidAsk1().getBid1Quantity().toString()).setType("limit").setSide("sell").build();
        }
        else {
            return;
        }
        String orderRef1 = insertOrder(insertOrderRequest1);
        String orderRef2 = insertOrder(insertOrderRequest2);
        comboOrders.add(new ComboOrder(comboSymbol, orderRef1, orderRef2, "sendout", "sendout", direction));
    }

    private String insertOrder(Orderreporting.InsertOrderRequest insertOrderRequest) {
        String orderRef = UUID.randomUUID().toString().replace("-", "");

        tgStub.insertOrder(insertOrderRequest, new StreamObserver<Orderreporting.InsertOrderResponse>() {
            @Override
            public void onNext(Orderreporting.InsertOrderResponse value) {
                if(!value.getCRsp().getIsSuccess()){
                    System.out.println("报单失败: " + value.getCRsp().getErrMsg());
                    // TODO 重新报单
                }else{
                    //收到响应后更新orderId
                    System.out.println("报单成功：" + value.getOrderId());
                    for (ComboOrder comboOrder : comboOrders) {
                        if (comboOrder.getOrderRef1().equals(orderRef)){
                            comboOrder.setStatus1("submitted");
                        }
                        else if (comboOrder.getOrderRef2().equals(orderRef)){
                            comboOrder.setStatus2("submitted");
                        }
                    }
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("报单失败: " + t.getMessage());
                // TODO 重新报单
            }

            @Override
            public void onCompleted() {

            }
        });
        return orderRef;
    }
//
//    /**
//     * 撤单，收到响应后，将订单从orderMap中移除
//     * @param orderId 订单ID
//     */
//    private void cancel(String orderId) {
//        if(orderId.equals("placeHolderOrderId")) //报单还未受理，不需要撤单
//            return;
//        final Orderreporting.CancelOrderRequest cancelOrderRequest = Orderreporting.CancelOrderRequest.newBuilder().setOrderId(orderId).build();
//        tgStub.cancelOrder(cancelOrderRequest, new StreamObserver<Orderreporting.CancelOrderResponse>() {
//            @Override
//            public void onNext(Orderreporting.CancelOrderResponse value) {
//                if(value.getCRsp().getIsSuccess()){
//                    if(value.getData().getOrderId().equals(orderMap.get("buy"))){
//                        System.out.println("取消买单成功");
//                        orderMap.remove("buy");
//                    }else if(value.getData().getOrderId().equals(orderMap.get("sell"))){
//                        System.out.println("取消卖单成功");
//                        orderMap.remove("sell");
//                    }
//                }else{
//                    System.out.println("撤单失败： " + value.getCRsp().getErrMsg());
//                }
//            }
//
//            @Override
//            public void onError(Throwable t) {
//                t.printStackTrace();
//            }
//
//            @Override
//            public void onCompleted() {
//
//            }
//        });
//    }
//

    public static void Array2CSV(List<List<String>> data, String path)
    {
        try {
            BufferedWriter out =new BufferedWriter(new OutputStreamWriter(new FileOutputStream(path),"UTF-8"));
            for (int i = 0; i < data.size(); i++)
            {
                List<String> onerow=data.get(i);
                for (int j = 0; j < onerow.size(); j++)
                {
                    out.write(onerow.get(j));
                    out.write(",");
                }
                out.newLine();
            }
            out.flush();
            out.close();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }


}
